package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.AsyncDimFunction;
import com.atguigu.bean.TradeTrademarkCategoryUserSpuOrderBean;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyClickHouseUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

//数据流：web/app -> Nginx -> 业务服务器 -> Mysql(Binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：  Mock -> Mysql(Binlog) -> Maxwell -> Kafka(ZK) -> DwdTradeOrderPreProcess -> Kafka(ZK) -> DwdTradeOrderDetail -> Kafka(ZK) -> DwsTradeUserSpuOrderWindow(Phoenix(HBase HDFS ZK)-Redis) -> ClickHouse(ZK)
public class DwsTradeUserSpuOrderWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //启用状态后端
//        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.setRestartStrategy(
//                RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
//        );
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取Kafka DWD层下单主题数据创建流
        String topic = "dwd_trade_order_detail";
        String groupId = "user_spu_order_window_211227";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤掉null值数据(已经替换为"")并转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception ignored) {
                }
            }
        });

        //TODO 4.按照订单明细ID分组
        KeyedStream<JSONObject, String> keyedByDetailIdDS = jsonObjDS.keyBy(json -> json.getString("id"));

        //TODO 5.使用状态编程+定时器的方式获取时间最大的一条数据
        SingleOutputStreamOperator<JSONObject> filterDS = keyedByDetailIdDS.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {

            private ValueState<JSONObject> orderDetailState;

            @Override
            public void open(Configuration parameters) throws Exception {
                orderDetailState = getRuntimeContext().getState(new ValueStateDescriptor<JSONObject>("order-state", JSONObject.class));
            }

            @Override
            public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {

                //取出状态数据
                JSONObject lastOrderDetail = orderDetailState.value();

                //判断状态数据是否为null
                if (lastOrderDetail == null) {
                    //说明为第一条数据,将当前数据保存至状态,同时注册定时器
                    orderDetailState.update(value);
                    long currentPt = ctx.timerService().currentProcessingTime();
                    ctx.timerService().registerProcessingTimeTimer(currentPt + 5000L);
                } else {
                    //说明不是第一条数据,则需要比较状态中数据与当前数据的时间大小
                    if (value.getString("row_op_ts").compareTo(lastOrderDetail.getString("row_op_ts")) >= 0) {
                        orderDetailState.update(value);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<JSONObject> out) throws Exception {

                //取出状态数据并输出,注意需要清空状态
                JSONObject value = orderDetailState.value();
                out.collect(value);

                orderDetailState.clear();
            }
        });

        //TODO 6.转换数据为JavaBean对象
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> tradeTrademarkCategoryUserSpuOrderDS = filterDS.map(json -> {
            HashSet<String> orderIds = new HashSet<>();
            orderIds.add(json.getString("order_id"));

            return TradeTrademarkCategoryUserSpuOrderBean.builder()
                    .userId(json.getString("user_id"))
                    .skuId(json.getString("sku_id"))
                    .orderIdSet(orderIds)
                    .orderAmount(json.getDouble("split_total_amount"))
                    .ts(DateFormatUtil.toTs(json.getString("create_time"), true))
                    .build();
        });

        //TODO 7.提取时间戳生成WaterMark
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> beanWithWmDS = tradeTrademarkCategoryUserSpuOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeTrademarkCategoryUserSpuOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeTrademarkCategoryUserSpuOrderBean>() {
            @Override
            public long extractTimestamp(TradeTrademarkCategoryUserSpuOrderBean element, long recordTimestamp) {
                return element.getTs();
            }
        }));

        //TODO 8.关联维表  补充与分组相关的信息  SKU
//        beanWithWmDS.map(new RichMapFunction<TradeTrademarkCategoryUserSpuOrderBean, TradeTrademarkCategoryUserSpuOrderBean>() {
//            @Override
//            public void open(Configuration parameters) throws Exception {
//                //初始化连接池
//            }
//            @Override
//            public TradeTrademarkCategoryUserSpuOrderBean map(TradeTrademarkCategoryUserSpuOrderBean value) throws Exception {
//                //拼接查询的SQL语句
//                //预编译SQL
//                //执行查询
//                //补充信息
//                //返回
//                return null;
//            }
//        });
        beanWithWmDS.print("beanWithWmDS>>>>>>>>>>>>");

        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> tradeWithSkuDS = AsyncDataStream.unorderedWait(
                beanWithWmDS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_SKU_INFO") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getSkuId();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setSpuId(dimInfo.getString("SPU_ID")); //"spuId"
                        input.setTrademarkId(dimInfo.getString("TM_ID"));
                        input.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
                    }
                },
                100, TimeUnit.SECONDS);

        tradeWithSkuDS.print("tradeWithSkuDS>>>>>>>>>>");

        //TODO 9.按照粒度分组
        KeyedStream<TradeTrademarkCategoryUserSpuOrderBean, Tuple4<String, String, String, String>> keyedStream = tradeWithSkuDS.keyBy(new KeySelector<TradeTrademarkCategoryUserSpuOrderBean, Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> getKey(TradeTrademarkCategoryUserSpuOrderBean value) throws Exception {
                return new Tuple4<>(value.getUserId(),
                        value.getSpuId(),
                        value.getTrademarkId(),
                        value.getCategory3Id());
            }
        });

        //TODO 10.开窗聚合
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceDS = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<TradeTrademarkCategoryUserSpuOrderBean>() {
                    @Override
                    public TradeTrademarkCategoryUserSpuOrderBean reduce(TradeTrademarkCategoryUserSpuOrderBean value1, TradeTrademarkCategoryUserSpuOrderBean value2) throws Exception {
                        value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
                        value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                        return value1;
                    }
                }, new WindowFunction<TradeTrademarkCategoryUserSpuOrderBean, TradeTrademarkCategoryUserSpuOrderBean, Tuple4<String, String, String, String>, TimeWindow>() {
                    @Override
                    public void apply(Tuple4<String, String, String, String> key, TimeWindow window, Iterable<TradeTrademarkCategoryUserSpuOrderBean> input, Collector<TradeTrademarkCategoryUserSpuOrderBean> out) throws Exception {

                        TradeTrademarkCategoryUserSpuOrderBean orderBean = input.iterator().next();

                        orderBean.setTs(System.currentTimeMillis());
                        orderBean.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        orderBean.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                        orderBean.setOrderCount((long) orderBean.getOrderIdSet().size());

                        out.collect(orderBean);
                    }
                });

        //TODO 11.关联维表 补充与分组不相关的信息  5张维表
        //11.1 关联SPU
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(
                reduceDS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_SPU_INFO") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getSpuId();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setSpuName(dimInfo.getString("SPU_NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //11.2 关联Trademark
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceWithTradeMarkDS = AsyncDataStream.unorderedWait(
                reduceWithSpuDS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_BASE_TRADEMARK") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getTrademarkId();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setTrademarkName(dimInfo.getString("TM_NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //11.3 关联Category3
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceWithCategory3DS = AsyncDataStream.unorderedWait(
                reduceWithTradeMarkDS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_BASE_CATEGORY3") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getCategory3Id();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setCategory2Id(dimInfo.getString("CATEGORY2_ID"));
                        input.setCategory3Name(dimInfo.getString("NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //11.4 关联Category2
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceWithCategory2DS = AsyncDataStream.unorderedWait(
                reduceWithCategory3DS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_BASE_CATEGORY2") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getCategory2Id();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setCategory1Id(dimInfo.getString("CATEGORY1_ID"));
                        input.setCategory2Name(dimInfo.getString("NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //11.5 关联Category1
        SingleOutputStreamOperator<TradeTrademarkCategoryUserSpuOrderBean> reduceWithCategory1DS = AsyncDataStream.unorderedWait(
                reduceWithCategory2DS,
                new AsyncDimFunction<TradeTrademarkCategoryUserSpuOrderBean>("DIM_BASE_CATEGORY1") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserSpuOrderBean input) {
                        return input.getCategory1Id();
                    }

                    @Override
                    public void join(TradeTrademarkCategoryUserSpuOrderBean input, JSONObject dimInfo) {
                        input.setCategory1Name(dimInfo.getString("NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //TODO 12.将数据写出
        reduceWithCategory1DS.print("reduceWithCategory1DS>>>>>>>>>>>>>");
        reduceWithCategory1DS.addSink(MyClickHouseUtil.getSink("insert into dws_trade_user_spu_order_window values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        //TODO 13.启动任务
        env.execute("DwsTradeUserSpuOrderWindow");

    }

}
